{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2020 NVIDIA Corporation. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "# =============================================================================="
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# NVTabular demo on Outbrain Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this notebook we train TF Wide & Deep Learning framework using [Kaggle Outbrain dataset](https://www.kaggle.com/c/outbrain-click-prediction). In that competition, ‘Kagglers’ were challenged to predict on which ads and other forms of sponsored content its global users would click. One of  the top finishers' preprocessing and feature engineering pipeline is taken into consideration here, and this pipeline was restructured using NVTabular and cuDF. The Kaggle Outbrain click prediction challenge datasets can be downloaded from [here](https://www.kaggle.com/c/outbrain-click-prediction/data).\n",
    "\n",
    "[Wide & Deep Learning](https://ai.googleblog.com/2016/06/wide-deep-learning-better-together-with.html) refers to a class of networks that use the output of two parts working in parallel - wide model and deep model - to make predictions using categorical and continuous inputs. The wide model is a generalized linear model of features together with their transforms. The deep model in this notebook is a series of 5 hidden MLP layers of 1024 neurons each beginning with a dense embedding of features. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import numpy as np\n",
    "import glob\n",
    "import math\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "import pandas as pd\n",
    "import cudf\n",
    "import cupy\n",
    "from numba import cuda\n",
    "import rmm\n",
    "\n",
    "import nvtabular as nvt\n",
    "from nvtabular.ops import Normalize, FillMedian,FillMissing, Categorify, LogOp, JoinExternal, Dropna, LambdaOp, JoinGroupby, HashBucket, get_embedding_sizes\n",
    "from nvtabular.ops.column_similarity import ColumnSimilarity"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we set where the dataset should be saved once processed (OUTPUT_BUCKET_FOLDER), as well as where the dataset originally resides (DATA_BUCKET_FOLDER)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "OUTPUT_BUCKET_FOLDER = os.environ.get(\"OUTPUT_DATA_DIR\", \"./preprocessed/\")\n",
    "DATA_BUCKET_FOLDER = os.environ.get(\"INPUT_DATA_DIR\", \"/dataset/\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here, we merge the component tables of our dataset into a single data frame, using [cuDF](https://github.com/rapidsai/cudf), which is a GPU DataFrame library for loading, joining, aggregating, filtering, and otherwise manipulating data. We do this because NVTabular applies a workflow to a single table. We also re-initialize managed memory. `rmm.reinitialize()` provides an easy way to initialize RMM (RAPIDS Memory Manager) with specific memory resource options across multiple devices. The reason we re-initialize managed memory here is to allow us to perform memory intensive merge operation. Note that dask-cudf can also be used here."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# %%time\n",
    "from cudf import read_csv\n",
    "\n",
    "# use managed memory for device memory allocation\n",
    "rmm.reinitialize(managed_memory=True)  \n",
    "\n",
    "# Merge all the CSV files together\n",
    "documents_meta = read_csv(DATA_BUCKET_FOLDER + 'documents_meta.csv', na_values=['\\\\N', ''])\n",
    "merged = (read_csv(DATA_BUCKET_FOLDER+'clicks_train.csv', na_values=['\\\\N', ''])\n",
    "             .merge(read_csv(DATA_BUCKET_FOLDER + 'events.csv', na_values=['\\\\N', '']), on=\"display_id\", how=\"left\", suffixes=('', '_event'))\n",
    "             .merge(read_csv(DATA_BUCKET_FOLDER+'promoted_content.csv', na_values=['\\\\N', '']), on=\"ad_id\", how=\"left\", suffixes=('', '_promo'))\n",
    "             .merge(documents_meta, on=\"document_id\", how=\"left\")\n",
    "             .merge(documents_meta, left_on=\"document_id_promo\", right_on=\"document_id\", how=\"left\", suffixes=('', \"_promo\")))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's create the output directories to store the preprocessed parquet files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_train_dir = os.path.join(OUTPUT_BUCKET_FOLDER, 'train/')\n",
    "output_valid_dir = os.path.join(OUTPUT_BUCKET_FOLDER, 'valid/')\n",
    "! mkdir -p $output_train_dir\n",
    "! mkdir -p $output_valid_dir"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We use a time-stratified sample to create a validation set that is more recent, and save both our train and validation sets to parquet files to be read by NVTabular."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Do a stratified split of the merged dataset into a training/validation dataset\n",
    "merged['day_event'] = (merged['timestamp'] / 1000 / 60 / 60 / 24).astype(int)\n",
    "random_state = cudf.Series(cupy.random.uniform(size=len(merged)))\n",
    "valid_set, train_set = merged.scatter_by_map(((merged.day_event <= 10) & (random_state > 0.2)).astype(int)) \n",
    "train_set.to_parquet(OUTPUT_BUCKET_FOLDER+\"train_gdf.parquet\", compression=None)\n",
    "valid_set.to_parquet(OUTPUT_BUCKET_FOLDER+\"valid_gdf.parquet\", compression=None)\n",
    "merged = train_set = valid_set= None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmm.reinitialize(managed_memory=False) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We read in three more cudf data frames, <i>documents categories</i>, <i>topics</i>, and <i>entities</i>, and use them to create sparse matricies in cupy. We wil use these later to calculate cosine similarity between event document (landing page context) and ad document profile vectors (TF-IDF), i.e., how close in profile an ad is to the page that it is being displayed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "documents_categories_cudf = cudf.read_csv(DATA_BUCKET_FOLDER + 'documents_categories.csv')\n",
    "documents_topics_cudf = cudf.read_csv(DATA_BUCKET_FOLDER + 'documents_topics.csv')\n",
    "documents_entities_cudf = cudf.read_csv(DATA_BUCKET_FOLDER + 'documents_entities.csv')\n",
    "# read in document categories/topics/entities as cupy sparse matrices\n",
    "def df_to_coo(df, row=\"document_id\", col=None, data=\"confidence_level\"):\n",
    "    return cupy.sparse.coo_matrix((df[data].values, (df[row].values, df[col].values)))\n",
    "\n",
    "categories = df_to_coo(documents_categories_cudf, col=\"category_id\")\n",
    "topics = df_to_coo(documents_topics_cudf, col=\"topic_id\")\n",
    "documents_entities_cudf['entity_id'] = documents_entities_cudf['entity_id'].astype(\"category\").cat.codes\n",
    "entities = df_to_coo(documents_entities_cudf, col=\"entity_id\")\n",
    "\n",
    "documents_categories_cudf=None\n",
    "documents_topics_cudf =None\n",
    "documents_entities_cudf=None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that our dataset and sparse matrices are created, we can begin laying the groundwork for NVTabular. In this case, categorical columns are treated as integers, and numerical columns are treated as floats. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "NVTabular requires input features to be specified to be either categorical or continuous upon workflow instantiation, so we define our continuous features and categorical features at this step. We also create a function that calculates the time difference between a specified time column (either publish_time or publish_time_promo) and timestamp. This is used to calculate <i>time elapsed since publication</i> between the landing page and the ad."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "CATEGORICAL_COLUMNS =['ad_id', 'document_id', 'platform', 'document_id_promo', 'campaign_id', 'advertiser_id', 'source_id', \n",
    "                      'publisher_id', 'source_id_promo', 'publisher_id_promo', 'geo_location', 'geo_location_country', 'geo_location_state']\n",
    "CONTINUOUS_COLUMNS = ['publish_time', 'publish_time_promo', 'timestamp', 'document_id_promo_clicked_sum_ctr','publisher_id_clicked_sum_ctr',\n",
    "                      'source_id_clicked_sum_ctr', 'document_id_promo_count', 'publish_time_days_since_published', 'ad_id_clicked_sum_ctr',\n",
    "                      'advertiser_id_clicked_sum_ctr','campaign_id_clicked_sum_ctr', 'ad_id_count', 'publish_time_promo_days_since_published']\n",
    "\n",
    "TIMESTAMP_DELTA = 1465876799998\n",
    "\n",
    "def calculate_delta(col,gdf):\n",
    "    col.loc[col == \"\"] = None\n",
    "    col = col.astype('datetime64[ns]')\n",
    "    timestamp = (gdf['timestamp']+TIMESTAMP_DELTA).astype('datetime64[ms]')\n",
    "    delta = (timestamp - col).dt.days\n",
    "    delta = delta * (delta >=0) * (delta<=10*365)\n",
    "    return delta"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With the groundwork laid, we now initiate our workflow, and specify our continuous and categorical columns using the lists we defined above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "workflow = nvt.Workflow(\n",
    "    cat_names=CATEGORICAL_COLUMNS,\n",
    "    cont_names= CONTINUOUS_COLUMNS,\n",
    "    label_name=['clicked'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "At this point, our data still isn’t in a form that’s ideal for consumption by our W&D model. There are missing values, and our categorical variables are still represented by random, discrete identifiers, and need to be transformed into contiguous indices for embedding lookups. The distributions of our continuous variables are uncentered. We also would like to create new features that will help to increase the model accuracy."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's begin to create and process features using NVTabular ops:\n",
    " * <i>geo_location_state</i> and <i>geo_location_country</i> are created by stripping geo_location using the `LambdaOp`\n",
    " * <i>publish_time_days_since_published</i> and <i>publish_time_promo_days_since_published</i> features are created using the `calculate_delta` function in a `LambdaOp`\n",
    " * Click through rate (CTR) statistics for a feature are calculated by first using `JoinGroupby` op to calculate the sum and count of the 'clicked' column grouped on a  specified feature, and then the smoothed CTRs are calculated using the formula:\n",
    " ```\n",
    " def smoothed_ctr(clicks, displays, prior_ctr=0.1, prior_weight=10):\n",
    "    return (clicks + prior_ctr * prior_weight) / (displays + prior_weight)\n",
    "    \n",
    "    prior_ctr and prior_weight are set to 0.1 and 10, respectively.\n",
    "```\n",
    "\n",
    "   \n",
    " * Missing values are filled using 0 or median value depending on the feature using `FillMedian()` and `FillMissing()` ops\n",
    " * Continuous features are log transformed with the `LogOp()` and then normalized to have zero mean and std dev of 1 with `Normalize()` op."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "groupby_columns=['ad_id_count', 'ad_id_clicked_sum', 'source_id_count', 'source_id_clicked_sum', 'document_id_promo_count', 'document_id_promo_clicked_sum', \n",
    "                 'publisher_id_count', 'publisher_id_clicked_sum', 'advertiser_id_count', 'advertiser_id_clicked_sum', 'campaign_id_count', 'campaign_id_clicked_sum']\n",
    "\n",
    "ctr_columns=['advertiser_id_clicked_sum_ctr','document_id_promo_clicked_sum_ctr','publisher_id_clicked_sum_ctr', 'source_id_clicked_sum_ctr', \n",
    "'ad_id_clicked_sum_ctr', 'campaign_id_clicked_sum_ctr']\n",
    "\n",
    "workflow.add_feature([\n",
    "    LambdaOp(\n",
    "        op_name='country',\n",
    "        f=lambda col, gdf: col.str.slice(0,2),\n",
    "        columns=['geo_location'], replace=False),\n",
    "    LambdaOp(\n",
    "        op_name='state',\n",
    "        f=lambda col, gdf: col.str.slice(0,5),\n",
    "        columns=['geo_location'],replace=False),\n",
    "    LambdaOp(\n",
    "        op_name='days_since_published',\n",
    "        f=calculate_delta,\n",
    "        columns=['publish_time','publish_time_promo'], replace=False),\n",
    "    \n",
    "    FillMedian(columns=['publish_time_days_since_published','publish_time_promo_days_since_published']),\n",
    "    \n",
    "    JoinGroupby(columns=['ad_id', 'source_id', 'document_id_promo', 'publisher_id', 'advertiser_id', 'campaign_id'], \n",
    "        cont_names=['clicked'],stats=['sum','count']),\n",
    "    \n",
    "    Categorify(columns=['document_id', 'ad_id', 'source_id', 'source_id_promo','document_id_promo', 'publisher_id', 'publisher_id_promo',\n",
    "                        'advertiser_id','platform', 'geo_location','geo_location_country','geo_location_state','campaign_id'], freq_threshold=0),\n",
    "    #calculate the smoothed ctr\n",
    "    LambdaOp(\n",
    "        op_name='ctr',\n",
    "        f=lambda col, gdf: (col + 0.1 * 10)/(gdf[col.name.replace(\"_clicked_sum\", \"_count\")]+10), \n",
    "        columns = [\"ad_id_clicked_sum\", \"source_id_clicked_sum\", \"document_id_promo_clicked_sum\", \"publisher_id_clicked_sum\", \n",
    "                    \"advertiser_id_clicked_sum\", \"campaign_id_clicked_sum\"], replace=False),\n",
    "    #fill missing values\n",
    "    FillMissing(columns=groupby_columns + ctr_columns),\n",
    "    #apply log1p operation\n",
    "    LogOp(columns=groupby_columns + ['publish_time_days_since_published','publish_time_promo_days_since_published']),\n",
    "    # Standardize the features around 0 with a std of 1\n",
    "    Normalize(columns=groupby_columns) \n",
    "])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below we set all frequency thresholds to 0. The `freq_threshold` param is used for frequency capping. This handy functionality will map all categories which occur in the dataset with some threshold level of infrequency to the _same_ index, keeping the model from overfitting to sparse signals. One can easily create a frequency threshold dictionary, assign a custom threshold value for each categorical feature, and feed that dictionary into the `Categorify` op as `freq_threshold` param."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<i>doc_event_doc_ad_sim_categories</i>, <i>topics</i>, and <i>entities</i> are calculated using the `ColumnSimilarity` op, which utilizes the sparse categories, topics, and entities matrices that were created above to calculate landing page similarity for categories, topics, and entities."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/rapids/notebooks/benf/NVTabular/nvtabular/ops/column_similarity.py:246: RuntimeWarning: divide by zero encountered in true_divide\n",
      "  idf = np.log(N / np.bincount(X.col))\n"
     ]
    }
   ],
   "source": [
    "op = ColumnSimilarity(\"doc_event_doc_ad_sim_categories\", \"document_id\", categories, \"document_id_promo\", metric='tfidf', on_device=False)\n",
    "workflow.add_feature(op)\n",
    "op = ColumnSimilarity(\"doc_event_doc_ad_sim_topics\", \"document_id\", topics, \"document_id_promo\", metric='tfidf', on_device=False)\n",
    "workflow.add_feature(op)\n",
    "op = ColumnSimilarity(\"doc_event_doc_ad_sim_entities\", \"document_id\", entities, \"document_id_promo\", metric='tfidf', on_device=False)\n",
    "workflow.add_feature(op)\n",
    "\n",
    "workflow.finalize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " We then create an NVTabular Dataset object for both train and validation. We apply our <i>Workflow</i> to our datasets and save the results out to parquet files for fast reading at train time. We also measure and record statistics on our training set using the `record_stats=True` parameter so that our <i>Workflow</i> can use them at apply time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_dataset = nvt.Dataset(OUTPUT_BUCKET_FOLDER+'train_gdf.parquet', part_mem_fraction=0.12)\n",
    "valid_dataset = nvt.Dataset(OUTPUT_BUCKET_FOLDER+'valid_gdf.parquet', part_mem_fraction=0.12)\n",
    "\n",
    "workflow.apply(train_dataset, record_stats=True, output_path=output_train_dir, shuffle=True, out_files_per_proc=5)\n",
    "workflow.apply(valid_dataset, record_stats=False, output_path=output_valid_dir, shuffle=False, out_files_per_proc=5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that our data is preprocessed and saved out, we can leverage datasets to read through the preprocessed parquet files in an online fashion to train neural networks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_paths = sorted(glob.glob(os.path.join(output_train_dir, '*.parquet')))\n",
    "valid_paths = sorted(glob.glob(os.path.join(output_valid_dir, '*.parquet')))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "EMBEDDING_TABLE_SHAPES defines the size of the embedding tables that our model will use to map categorical outputs from NVTabular into numeric dense inputs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'ad_id': (418243, 16),\n",
       " 'advertiser_id': (4055, 16),\n",
       " 'campaign_id': (31382, 16),\n",
       " 'document_id': (693439, 16),\n",
       " 'document_id_promo': (144032, 16),\n",
       " 'geo_location': (2886, 16),\n",
       " 'geo_location_country': (232, 16),\n",
       " 'geo_location_state': (2486, 16),\n",
       " 'platform': (5, 4),\n",
       " 'publisher_id': (483, 16),\n",
       " 'publisher_id_promo': (817, 16),\n",
       " 'source_id': (4738, 16),\n",
       " 'source_id_promo': (6819, 16)}"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# here we use default embedding size rule defined within NVTabular.\n",
    "EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(workflow)\n",
    "EMBEDDING_TABLE_SHAPES"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can save the stats from the workflow and load it anytime, so we can run training without doing preprocessing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "workflow.save_stats('stats_wnd_workflow')\n",
    "# uncomment to load the workflow stats\n",
    "#workflow.load_stats('stats_wnd_workflow')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We select certain categorical and numerical features that are processed and generated via the NVTabular workflow to train our W&D TF model. Below, we'll exclude three features from the CONTINUOUS_COLUMNS list, and use the rest as cont input features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "exclude_conts= ['publish_time', 'publish_time_promo', 'timestamp']\n",
    "NUMERIC_COLUMNS = [col for col in CONTINUOUS_COLUMNS if col not in exclude_conts]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['document_id_promo_clicked_sum_ctr',\n",
       " 'publisher_id_clicked_sum_ctr',\n",
       " 'source_id_clicked_sum_ctr',\n",
       " 'document_id_promo_count',\n",
       " 'publish_time_days_since_published',\n",
       " 'ad_id_clicked_sum_ctr',\n",
       " 'advertiser_id_clicked_sum_ctr',\n",
       " 'campaign_id_clicked_sum_ctr',\n",
       " 'ad_id_count',\n",
       " 'publish_time_promo_days_since_published']"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "NUMERIC_COLUMNS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2.3.0\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "print(tf.__version__)\n",
    "from tensorflow.python.feature_column import feature_column_v2 as fc\n",
    "\n",
    "# we can control how much memory to give tensorflow with this environment variable\n",
    "# IMPORTANT: make sure you do this before you initialize TF's runtime, otherwise\n",
    "# TF will have claimed all free GPU memory\n",
    "#os.environ['TF_MEMORY_ALLOCATION'] = \"0.8\" # fraction of free memory\n",
    "from nvtabular.loader.tensorflow import KerasSequenceLoader, KerasSequenceValidater\n",
    "from nvtabular.framework_utils.tensorflow import layers\n",
    "from tensorflow.python.feature_column import feature_column_v2 as fc"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We create tensorflow feature columns corresponding to each feature of the model input. If you're using NVTabular with TensorFlow feature_columns, you should only be using `tf.feature_column.categorical_column_with_identity` for categorical features, since any other transformation (categorification and/or hashing) should be handled in NVTabular on the GPU. This feature column is passed to the wide portion of the model. If a categorical column corresponds to an embedding table, it is wrapped with an embedding_column feature_column, if it does not correspond to an embedding table, it is wrapped as an indicator column. The wrapped column is passed to the deep portion of the model. Continuous columns are passed to both the wide and deep portions of the model after being encapsulated as a `numeric_column`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_feature_columns():\n",
    "    wide_columns, deep_columns = [], []\n",
    "\n",
    "    for column_name in CATEGORICAL_COLUMNS:\n",
    "        if column_name in EMBEDDING_TABLE_SHAPES: # Changing hashing to identity + adding modulo to dataloader\n",
    "            categorical_column = tf.feature_column.categorical_column_with_identity(\n",
    "                column_name, num_buckets=EMBEDDING_TABLE_SHAPES[column_name][0])\n",
    "        else:\n",
    "            raise ValueError(f'Unexpected categorical column found {column_name}')\n",
    "\n",
    "        if column_name in EMBEDDING_TABLE_SHAPES:\n",
    "            wrapped_column = tf.feature_column.embedding_column(\n",
    "                categorical_column,\n",
    "                dimension=EMBEDDING_TABLE_SHAPES[column_name][1],\n",
    "                combiner='mean')\n",
    "        else:\n",
    "            wrapped_column = tf.feature_column.indicator_column(categorical_column)\n",
    "            \n",
    "        wide_columns.append(categorical_column)\n",
    "        deep_columns.append(wrapped_column)\n",
    "    \n",
    "    numerics = [tf.feature_column.numeric_column(column_name, shape=(1,),dtype=tf.float32) \n",
    "                for column_name in NUMERIC_COLUMNS]\n",
    "    \n",
    "    wide_columns.extend(numerics)\n",
    "    deep_columns.extend(numerics)\n",
    "       \n",
    "    return wide_columns, deep_columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we define the layer shape and dropout probability for the deep portion of the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "deep_hidden_units=[1024,1024,1024,1024,1024]\n",
    "deep_dropout=.1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "An input is created for each feature column, with a datatype of either tf.float32 for continuous values, or tf.int32 for categorical values. To implement the wide model, for categorical inputs, we embed them to a dimension of one, and sum them with the results of applying a dense layer with output dimension one, effectively weighting and summing each of the inputs. For the deep model, we embed our categorical columns according to the feature columns we defined earlier, and concatenate the newly dense features with our dense continuous features, which we pass to our deep model, which by default is a 5 layer MLP with internal dimension of 1024 neurons for each layer. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ad_id\n",
      "document_id\n",
      "platform\n",
      "document_id_promo\n",
      "campaign_id\n",
      "advertiser_id\n",
      "source_id\n",
      "publisher_id\n",
      "source_id_promo\n",
      "publisher_id_promo\n",
      "geo_location\n",
      "geo_location_country\n",
      "geo_location_state\n",
      "document_id_promo_clicked_sum_ctr\n",
      "publisher_id_clicked_sum_ctr\n",
      "source_id_clicked_sum_ctr\n",
      "document_id_promo_count\n",
      "publish_time_days_since_published\n",
      "ad_id_clicked_sum_ctr\n",
      "advertiser_id_clicked_sum_ctr\n",
      "campaign_id_clicked_sum_ctr\n",
      "ad_id_count\n",
      "publish_time_promo_days_since_published\n"
     ]
    }
   ],
   "source": [
    "wide_columns, deep_columns = get_feature_columns()\n",
    "\n",
    "wide_weighted_outputs = []  # a list of (batch_size, 1) contributors to the linear weighted sum\n",
    "numeric_dense_inputs = []  # NumericColumn inputs; to be concatenated and then fed to a dense layer\n",
    "wide_columns_dict = {}  # key : column\n",
    "deep_columns_dict = {}  # key : column\n",
    "features = {}  # tf.keras.Input placeholders for each feature to be used\n",
    "\n",
    "# construct input placeholders for wide features\n",
    "for col in wide_columns:\n",
    "    print(col.key)\n",
    "    features[col.key] = tf.keras.Input(shape=(1,),\n",
    "                                       batch_size=None, \n",
    "                                       name=col.key,\n",
    "                                       dtype=tf.float32 if col.key in NUMERIC_COLUMNS else tf.int32,\n",
    "                                       sparse=False)\n",
    "    wide_columns_dict[col.key] = col\n",
    "for col in deep_columns:\n",
    "    is_embedding_column = ('key' not in dir(col))\n",
    "    key = col.categorical_column.key if is_embedding_column else col.key\n",
    "\n",
    "    if key not in features:\n",
    "        features[key] = tf.keras.Input(shape=(1,), \n",
    "                                       batch_size=None, \n",
    "                                       name=key, \n",
    "                                       dtype=tf.float32 if col.key in NUMERIC_COLUMNS else tf.int32,\n",
    "                                       sparse=False)\n",
    "    deep_columns_dict[key] = col\n",
    "\n",
    "for key in wide_columns_dict:\n",
    "    if key in EMBEDDING_TABLE_SHAPES: \n",
    "        wide_weighted_outputs.append(tf.keras.layers.Flatten()(tf.keras.layers.Embedding(\n",
    "            EMBEDDING_TABLE_SHAPES[key][0], 1, input_length=1)(features[key])))\n",
    "    else:\n",
    "        numeric_dense_inputs.append(features[key])\n",
    "\n",
    "categorical_output_contrib = tf.keras.layers.add(wide_weighted_outputs,\n",
    "                                                 name='categorical_output')\n",
    "numeric_dense_tensor = tf.keras.layers.concatenate(\n",
    "    numeric_dense_inputs, name='numeric_dense')\n",
    "deep_columns = list(deep_columns_dict.values())\n",
    "\n",
    "dnn = layers.DenseFeatures(deep_columns, name='deep_embedded')(features)\n",
    "for unit_size in deep_hidden_units:\n",
    "    dnn = tf.keras.layers.Dense(units=unit_size,activation='relu')(dnn)\n",
    "    dnn = tf.keras.layers.Dropout(rate=deep_dropout)(dnn)\n",
    "    dnn = tf.keras.layers.BatchNormalization(momentum=.999)(dnn)\n",
    "dnn = tf.keras.layers.Dense(units=1)(dnn)\n",
    "dnn_model = tf.keras.Model(inputs=features,\n",
    "                           outputs=dnn)\n",
    "linear_output = categorical_output_contrib + tf.keras.layers.Dense(1)(numeric_dense_tensor)\n",
    "\n",
    "linear_model = tf.keras.Model(inputs=features,\n",
    "                              outputs=linear_output)\n",
    "\n",
    "wide_and_deep_model = tf.keras.experimental.WideDeepModel(\n",
    "    linear_model, dnn_model, activation='sigmoid')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We define the datasets that will be used to ingest data into our model. In this case, the NVTabular dataloaders take a set of parquet files generated by NVTabular as input, and are capable of accelerated throughput. The `KerasSequenceLoader` manages shuffling by loading in chunks of data from different parts of the full dataset, concatenating them and then shuffling, then iterating through this super-chunk sequentially in batches. The number of \"parts\" of the dataset that get sample, or \"partitions\", is controlled by the <i>parts_per_chunk</i> parameter, while the size of each one of these parts is controlled by the <i>buffer_size</i> parameter, which refers to a fraction of available GPU memory. Using more chunks leads to better randomness, especially at the epoch level where physically disparate samples can be brought into the same batch, but can impact throughput if you use too many.\n",
    "\n",
    "The validation process gets slightly complicated by the fact that <i>model.fit</i> doesn't accept Keras Sequence objects as validation data. To support this, we also define a KerasSequenceValidater, a lightweight Keras callback to handle validation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "TRAIN_PATHS = sorted(glob.glob('./preprocessed/train/*.parquet'))\n",
    "VALID_PATHS = sorted(glob.glob('./preprocessed/valid/*.parquet'))\n",
    "train_dataset_tf = KerasSequenceLoader(\n",
    "    TRAIN_PATHS, # you could also use a glob pattern\n",
    "    batch_size=131072,\n",
    "    label_names=['clicked'],\n",
    "    cat_names=CATEGORICAL_COLUMNS,\n",
    "    cont_names=NUMERIC_COLUMNS,\n",
    "    engine='parquet',\n",
    "    shuffle=True,\n",
    "    buffer_size=0.06, # how many batches to load at once\n",
    "    parts_per_chunk=1\n",
    ")\n",
    "\n",
    "valid_dataset_tf = KerasSequenceLoader(\n",
    "    VALID_PATHS, # you could also use a glob pattern\n",
    "    batch_size=131072,\n",
    "    label_names=['clicked'],\n",
    "    cat_names = CATEGORICAL_COLUMNS,\n",
    "    cont_names=NUMERIC_COLUMNS,\n",
    "    engine='parquet',\n",
    "    shuffle=False,\n",
    "    buffer_size=0.06,\n",
    "    parts_per_chunk=1\n",
    ")\n",
    "validation_callback = KerasSequenceValidater(valid_dataset_tf)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The wide portion of the model is optimized using the <i>Follow The Regularized Leader (FTRL)</i> algorithm, while the deep portion of the model is optimized using <i>Adam</i> optimizer."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "wide_optimizer = tf.keras.optimizers.Ftrl(\n",
    "        learning_rate=0.1,\n",
    ")\n",
    "\n",
    "deep_optimizer = tf.keras.optimizers.Adam(\n",
    "        learning_rate=0.2\n",
    ") "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we compile our model with our dual optimizers and binary cross-entropy loss, and train our model for 20 epochs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "WARNING:tensorflow:Layer wide_deep_model is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because its dtype defaults to floatx.\n",
      "\n",
      "If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.\n",
      "\n",
      "To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.\n",
      "\n",
      "Epoch 1/20\n",
      "456/456 [==============================] - 286s 627ms/step - loss: 0.7433 - binary_accuracy: 0.8014 - auc: 0.6933 - val_loss: 0.7407 - val_binary_accuracy: 0.8042 - val_auc: 0.6149\n",
      "Epoch 2/20\n",
      "456/456 [==============================] - 285s 626ms/step - loss: 0.4293 - binary_accuracy: 0.8147 - auc: 0.7438 - val_loss: 0.4303 - val_binary_accuracy: 0.8129 - val_auc: 0.6342\n",
      "Epoch 3/20\n",
      "456/456 [==============================] - 286s 626ms/step - loss: 0.4236 - binary_accuracy: 0.8168 - auc: 0.7534 - val_loss: 0.4249 - val_binary_accuracy: 0.8152 - val_auc: 0.6821\n",
      "Epoch 4/20\n",
      "218/456 [=============>................] - ETA: 2:07 - loss: 0.4184 - binary_accuracy: 0.8189 - auc: 0.7619"
     ]
    }
   ],
   "source": [
    "wide_and_deep_model.compile(\n",
    "    optimizer=[wide_optimizer, deep_optimizer],\n",
    "    loss='binary_crossentropy',\n",
    "    metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.AUC()],\n",
    "    experimental_run_tf_function=False\n",
    ")\n",
    "history = wide_and_deep_model.fit(train_dataset_tf, callbacks=[validation_callback], epochs=20)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
